package com.huan.flink;

import com.huan.flink.map.Product;
import com.huan.flink.map.ProductMapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * flink 中窗口的使用 - aggregate 增量聚合的使用
 *
 * @author huan.fu
 * @date 2024/1/6 - 11:36
 */
public class FlinkWindowAggregateApplication {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为2
        environment.setParallelism(2);

        environment.socketTextStream("localhost", 9999)
                .map(new ProductMapFunction())
                .keyBy((KeySelector<Product, Integer>) Product::getProductId)
                // 基于 "处理时间" 语义的 滚动 窗口，窗口大小为10s
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                /**
                 * aggregate 增量聚合函数
                 * 1、窗口的第一条数据来，创建窗口，创建累加器
                 * 2、窗口的第一条数据来时，就执行增量聚合操作，调用add方法
                 * 3、增量聚合操作，来一条数据计算一条数据，调用一个add方法
                 * 4、窗口输出时调用一次getResult方法
                 * 5、输入、累加器、输出结果 类型可以不一样
                 */
                .aggregate(
                        /**
                         * 第一个类型：输入数据的类型
                         * 第二个类型：累加器的类型，存储的中间计算结果
                         * 第三个类型：输出的类型
                         */
                        new AggregateFunction<Product, Integer, String>() {
                            /**
                             * 创建累加器，初始化累加器
                             * @return 累加器
                             */
                            @Override
                            public Integer createAccumulator() {
                                System.out.println("初始化累加器");
                                return 0;
                            }

                            /**
                             * 聚合逻辑
                             */
                            @Override
                            public Integer add(Product value, Integer accumulator) {
                                System.out.println("聚合逻辑 add方法调用 value:[" + value + "] current accumulator:[" + accumulator + "]");
                                return ++accumulator;
                            }

                            /**
                             * 获取最终结果，窗口触发时调用
                             */
                            @Override
                            public String getResult(Integer accumulator) {
                                System.out.println("调用getResult方法");
                                return String.valueOf(accumulator);
                            }

                            /**
                             * 只有会话窗口才会触发
                             */
                            @Override
                            public Integer merge(Integer a, Integer b) {
                                System.out.println("调用merge方法");
                                return null;
                            }
                        })
                .print("aggregate");

        environment.execute("window api");
    }
}
